package com.atguigu.edu.realtime.app.dwd.db;

import com.atguigu.edu.realtime.util.MyKafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//课程主题  下单情况统计（一次下单）第11张表 已跑通
public class DwdInteractionOrderComment {
    public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 从 Kafka 读取 topic_db 数据，封装为 Flink SQL 表
        tableEnv.executeSql(MyKafkaUtil.getTopicDbDDL("dwd_interaction_order_comment"));


        // TODO 3.1 读取订单明细表数据，封装为表
        Table orderDetail = tableEnv.sqlQuery("select\n" +
                "data['id'] id,\n" +
                "data['user_id'] user_id,\n" +
                "data['course_id'] course_id,\n" +
                "data['course_name'] course_name,\n" +
                "data['final_amount'] final_amount,\n" +
                "date_format(data['create_time'],'yyyy-MM-dd') date_id,\n" +
                "data['create_time'] create_time,\n" +
                "ts\n" +
                "from topic_db\n" +
                "where `table` = 'order_detail'\n" +
                "and `type` = 'insert'\n");
        tableEnv.createTemporaryView("order_detail",orderDetail);

        // TODO 4. 建立 Upsert-Kafka dwd_course_question_answer 表
        tableEnv.executeSql("create table dwd_interaction_order_comment (\n" +
                "id string,\n" +
                "user_id string,\n" +
                "course_id string,\n" +
                "course_name string,\n" +
                "final_amount string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "ts string,\n" +
                "primary key(id) not enforced\n" +
                ")" + MyKafkaUtil.getUpsertKafkaDDL("dwd_interaction_order_comment"));

        // TODO 5. 将数据写入 Upsert-Kafka 表
        tableEnv.executeSql("" +
                "insert into dwd_interaction_order_comment select * from order_detail");

    }
}
